[Amazon SageMaker] クロマキー合成とアフィン変換で生成したデータセットで硬貨を検出してみました
1 はじめに
CX事業本部の平内(SIN)です。
精度の高い物体検出モデルを作成するためには、データセットが重要でが、物体検出モデルでは、画像だけでなく、アノテーション作業も必要となるため、大量に質の高いデータを用意するのは、結構課題のある話だと思います。
下記では、商品画像の背景を透過処理して、アフィン変換しながら背景と合成することでデータセット作成しました。
上では、背景透過処理は、手動で行いましたが、これも大変だということで、今回は、背景をグリーンバックで撮影し、クロマキー処理で、背景透過も自動化してみました。
最初に、今回の作業で、作成されたモデルで推論している様子です。クラスは、1円、5円、10円、50円、100円の5つです。
用意した画像は、各硬貨ごと表10枚、裏10枚の100枚で、そこから生成したデータセットは、画像が3,000枚、アノテーションが約69,000個となってます。
2 元画像から切り出し
元画像は、グリーンの布上に硬貨を並べて撮影しています。
撮影した画像をプレビューで開いて、1個づつ選択して、Comand+C Command+Nで切り出していきます。この切り出しが、そのままアノテーション情報になるので、比較的に丁寧に行ったほうが良いと思います。
切り出したファイルは、Command+S でそれぞれのファルダに連番で保存しました。
3 クロマキー処理
上段が、元画像で、下がクロマキーで背景を透過したものです。
コードは、以下のようになっています。
元画像をアルファチャンネル有りで読み込み、HSV色空有間に変換して、背景の色相だけカットしています。
ちなみに色相においてグリーンは、60°〜150°ぐらいですが、OpenCVでは、180度で指定するため、約1/2で指定しています。
※ 50〜79は、今回の画像で、綺麗に背景が抜ける色相ということで、調整した結果です。
""" クロマキー合成のための元画像の作成 背景が緑色の硬貨画像から、背景を透過した画像に変換する """ import os import cv2 import glob import numpy as np dirs = [ "1_back","1_front", "5_back","5_front", "10_back","10_front", "50_back","50_front", "100_back","100_front", ] input = "./data/coins_org" output = "./data/coins_transparent" for dir in dirs: in_dir = "{}/{}".format(input, dir) out_dir = "{}/{}".format(output, dir) os.makedirs(out_dir, exist_ok=True) files = glob.glob("{}/*.png".format(in_dir)) for file in files: print(file) # アルファチャンネル有り画像読込み img = cv2.imread(file, -1) # HSVに変換 hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # クロマキーによるマスク生成 mask = cv2.inRange(hsv, (50, 0, 0), (79, 255, 255)) # マクス削除 img = cv2.bitwise_not(img, img, mask=mask) # 画像保存 basename = os.path.basename(file) cv2.imwrite("{}/{}".format(out_dir, basename), img)
4 アフィン変換と合成
背景を透過した画像をアフィン変換して、背景画像と合成しているようすです。
データ生成中には、アノテーション結果(バウンディングボックス)も確認のために表示していますが、データ画像には有りません。
以下が、処理しているコードです。生成されるデータセットは、Ground Truth形式となっています。
""" アフィン変換とクロマキー合成によりGround Truth形式のデータセットを作成する アフィン変換は、下記を参考にさせて頂きました。 https://github.com/aws-samples/smart-cooler/blob/master/_ml_model_package/synthetic-dataset.ipynb """ import json import glob import random import os import shutil import math import numpy as np import cv2 from PIL import Image # MAX = 3000 # 生成する画像数 MAX = 3000 # 生成する画像数 CLASS_NAME=["1en","5en","10en","50en","100en"] COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175)] SIZE=[90, 90, 90 ,90, 90] BACK_PATH = "./data/backgrounds" PRODUCT_PATH = "./data/products" OUTPUT_PATH = "./data/outputGroundTruth" S3Bucket = "s3://ground_truth_dataset" manifestFile = "output.manifest" def euler_to_mat(yaw, pitch, roll): c, s = math.cos(yaw), math.sin(yaw) M = np.matrix([[ c, 0., s], [ 0., 1., 0.], [ -s, 0., c]]) c, s = math.cos(pitch), math.sin(pitch) M = np.matrix([[ 1., 0., 0.], [ 0., c, -s], [ 0., s, c]]) * M c, s = math.cos(roll), math.sin(roll) M = np.matrix([[ c, -s, 0.], [ s, c, 0.], [ 0., 0., 1.]]) * M return M def make_affine_transform(from_shape, to_shape, min_scale, max_scale, scale_variation=1.0, rotation_variation=1.0, translation_variation=1.0): from_size = np.array([[from_shape[1], from_shape[0]]]).T to_size = np.array([[to_shape[1], to_shape[0]]]).T scale = random.uniform((min_scale + max_scale) * 0.5 - (max_scale - min_scale) * 0.5 * scale_variation, (min_scale + max_scale) * 0.5 + (max_scale - min_scale) * 0.5 * scale_variation) roll = random.uniform(-1.0, 1.0) * rotation_variation pitch = random.uniform(-0.15, 0.15) * rotation_variation yaw = random.uniform(-0.15, 0.15) * rotation_variation M = euler_to_mat(yaw, pitch, roll)[:2, :2] h = from_shape[0] w = from_shape[1] corners = np.matrix([[-w, +w, -w, +w], [-h, -h, +h, +h]]) * 0.5 skewed_size = np.array(np.max(M * corners, axis=1) - np.min(M * corners, axis=1)) scale *= np.min(to_size / skewed_size) trans = (np.random.random((2,1)) - 0.5) * translation_variation trans = ((2.0 * trans) ** 5.0) / 2.0 trans = (to_size - skewed_size * scale) * trans center_to = to_size / 2. center_from = from_size / 2. M = euler_to_mat(yaw, pitch, roll)[:2, :2] M *= scale M = np.hstack([M, trans + center_to - M * center_from]) return M # アフィン変換 def transform(backImage, productImage, productSize): M = make_affine_transform( from_shape=productImage.shape, to_shape=backImage.shape, min_scale=0.8, max_scale=0.5, # rotation_variation=3.5, rotation_variation=2.5, scale_variation=0.5, # scale_variation=1.0, # translation_variation=0.98) translation_variation=1.0) # 1.0を超えられない object_topleft = tuple(M.dot(np.array((0, 0) + (1, ))).tolist()[0]) object_topright = tuple(M.dot(np.array((productSize, 0) + (1,))).tolist()[0]) object_bottomleft = tuple(M.dot(np.array((0,productSize) + (1,))).tolist()[0]) object_bottomright = tuple(M.dot(np.array((productSize, productSize) + (1,))).tolist()[0]) object_tups = (object_topleft, object_topright, object_bottomleft, object_bottomright) object_xmin = (min(object_tups, key=lambda item:item[0])[0]) object_xmax = (max(object_tups, key=lambda item:item[0])[0]) object_ymin = (min(object_tups, key=lambda item:item[1])[1]) object_ymax = (max(object_tups, key=lambda item:item[1])[1]) rect = ((int(object_xmin),int(object_ymin)),(int(object_xmax),int(object_ymax))) productImage = cv2.warpAffine(productImage, M, (backImage.shape[1], backImage.shape[0])) return productImage, rect # 背景と商品の合成 def margeImage(backImg, productImg): # PIL形式で重ねる back_pil = Image.fromarray(backImg) product_pil = Image.fromarray(productImg) back_pil.paste(product_pil, (0, 0), product_pil) return np.array(back_pil) # エフェクト(Gauss) def addGauss(img, level): return cv2.blur(img, (level * 2 + 1, level * 2 + 1)) # エフェクト(Noise) def addNoiseSingleChannel(single): diff = 255 - single.max() noise = np.random.normal(0, random.randint(1, 100), single.shape) noise = (noise - noise.min())/(noise.max()-noise.min()) noise= diff*noise noise= noise.astype(np.uint8) dst = single + noise return dst # エフェクト(Noise) def addNoise(img): img = img.astype('float64') img[:,:,0] = addNoiseSingleChannel(img[:,:,0]) img[:,:,1] = addNoiseSingleChannel(img[:,:,1]) img[:,:,2] = addNoiseSingleChannel(img[:,:,2]) return img.astype('uint8') # バウンディングボックス描画 def box(frame, rect, class_id): ((x1,y1),(x2,y2)) = rect label = "{}".format(CLASS_NAME[class_id]) img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2) img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1) cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA) return img # Manifest生成クラス class Manifest: def __init__(self, class_name): self.__lines = '' self.__class_map={} for i in range(len(class_name)): self.__class_map[str(i)] = class_name[i] def appned(self, fileName, data, height, width): date = "0000-00-00T00:00:00.000000" line = { "source-ref": "{}/{}".format(S3Bucket, fileName), "boxlabel": { "image_size": [ { "width": width, "height": height, "depth": 3 } ], "annotations": [] }, "boxlabel-metadata": { "job-name": "xxxxxxx", "class-map": self.__class_map, "human-annotated": "yes", "objects": { "confidence": 1 }, "creation-date": date, "type": "groundtruth/object-detection" } } for i in range(data.max()): (_, rect, class_id) = data.get(i) ((x1,y1),(x2,y2)) = rect line["boxlabel"]["annotations"].append({ "class_id": class_id, "width": x2 - x1, "top": y1, "height": y2 - y1, "left": x1 }) self.__lines += json.dumps(line) + '\n' def get(self): return self.__lines # 背景画像生成クラス class Backgrounds: def __init__(self, backPath): self.__backPath = backPath def get(self): imagePath = random.choice(glob.glob(self.__backPath + '/*.png')) return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED) # 商品画像生成クラス class Products: def __init__(self, productPath, class_name, size): self.__productPath = productPath self.__class_name = class_name self.__size = size def get(self, class_id): # 商品画像 class_name = self.__class_name[class_id] image_path = random.choice(glob.glob(self.__productPath + '/' + class_name + '/*.png')) product_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) # 商品画像のサイズ size = self.__size[class_id] return (self.__resize(product_image, size), size, class_id) # 商品画像のサイズ調整 def __resize(self, img, size): org_h, org_w = img.shape[:2] imageArray = np.zeros((org_h, org_w, 4), np.uint8) img = cv2.resize(img, (size, size)) imageArray[0:size, 0:size] = img return imageArray # 1画像分のデータを保持するクラス class Data: def __init__(self, rate): self.__rects = [] self.__images = [] self.__class_ids = [] self.__rate = rate def get_class_ids(self): return self.__class_ids def max(self): return len(self.__rects) def get(self, i): return (self.__images[i], self.__rects[i], self.__class_ids[i]) # 追加(重複率が指定値以上の場合は失敗する) def append(self, productImage, rect, class_id): conflict = False for i in range(len(self.__rects)): iou = self.__multiplicity(self.__rects[i], rect) if(iou > self.__rate): conflict = True break if(conflict == False): self.__rects.append(rect) self.__images.append(productImage) self.__class_ids.append(class_id) return True return False # 重複率 def __multiplicity(self, a, b): (ax_mn, ay_mn) = a[0] (ax_mx, ay_mx) = a[1] (bx_mn, by_mn) = b[0] (bx_mx, by_mx) = b[1] a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1) b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1) abx_mn = max(ax_mn, bx_mn) aby_mn = max(ay_mn, by_mn) abx_mx = min(ax_mx, bx_mx) aby_mx = min(ay_mx, by_mx) w = max(0, abx_mx - abx_mn + 1) h = max(0, aby_mx - aby_mn + 1) intersect = w*h return intersect / (a_area + b_area - intersect) # 各クラスのデータ数が同一になるようにカウントする class Counter(): def __init__(self, max): self.__counter = np.zeros(max) def get(self): n = np.argmin(self.__counter) self.__counter[n] += 1 return int(n) def print(self): print(self.__counter) def main(): # 出力先の初期化 if os.path.exists(OUTPUT_PATH): shutil.rmtree(OUTPUT_PATH) os.mkdir(OUTPUT_PATH) backgrounds = Backgrounds(BACK_PATH) products = Products(PRODUCT_PATH, CLASS_NAME, SIZE) manifest = Manifest(CLASS_NAME) counter = Counter(len(CLASS_NAME)) no = 0 while(True): # 背景画像の取得 backImage = backgrounds.get() # 商品データ data = Data(0.15) for _ in range(250): class_id = counter.get() # class_id = random.randint(0, len(CLASS_NAME)-1) # 商品画像の取得 product_image, product_size, class_id = products.get(class_id) # アフィン変換 product_image, rect = transform(backImage, product_image, product_size) # 商品の追加(重複した場合は、失敗する) ret = data.append(product_image, rect, class_id) if(ret): print("ret:{} rect:{}".format(ret, rect)) print("max:{}".format(data.max())) frame = backImage for index in range(data.max()): (product_image, _, _) = data.get(index) # 合成 frame = margeImage(frame, product_image) # アルファチャンネル削除 frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # エフェクト(gauss) frame = addGauss(frame, random.randint(0, 2)) # エフェクト(Noise) frame = addNoise(frame) # 画像名 fileName = "{:04d}.png".format(no) no+=1 # 画像保存 cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame) # manifest追加 manifest.appned(fileName, data, frame.shape[0], frame.shape[1]) for i in range(data.max()): (_, rect, class_id) = data.get(i) # バウンディングボックス描画(確認用) frame = box(frame, rect, class_id) counter.print() if(MAX <= no): break # 表示(確認用) cv2.imshow("frame", frame) cv2.waitKey(1) # manifest 保存 with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f: f.write(manifest.get()) main()
5 RecordIO形式への変換
Ground Truth形式のデータをRecordIO形式に変換しています。生成時に各クラスの出力分布が均等になるように調整されているので、ここでは、単純に80%を訓練用、残りを検証用にしています。
""" GroundTruth形式のデータセットからRecordIO形式のデータセットを作成する """ import json import os import subprocess # 定義 inputPath = './data/outputGroundTruth' outputPath = './data/outputRecordIO' inputPath = '/private/tmp/coin/data/outputGroundTruth' outputPath = '/private/tmp/coin/data/outputRecordIO' manifest = 'output.manifest' CLASS_NAME=["1en","5en","10en","50en","100en"] # 分割の比率 ratio = 0.8 # 8:2に分割する # 1件のデータを表現するクラス class Data(): def __init__(self, src): # プロジェクト名の取得 for key in src.keys(): index = key.rfind("-metadata") if(index!=-1): projectName = key[0:index] # メタデータの取得 metadata = src[projectName + '-metadata'] class_map = metadata["class-map"] # 画像名の取得 self.__imgFileName = os.path.basename(src["source-ref"]) # 画像サイズの取得 project = src[projectName] image_size = project["image_size"] self.__img_width = image_size[0]["width"] self.__img_height = image_size[0]["height"] self.__annotations = [] # アノテーションの取得 for annotation in project["annotations"]: class_id = annotation["class_id"] top = annotation["top"] left = annotation["left"] width = annotation["width"] height = annotation["height"] self.__annotations.append({ "label": class_map[str(class_id)], "width": width, "top": top, "height": height, "left": left }) @property def annotations(self): return self.__annotations # 指定されたラベルを含むかどうか def exsists(self, label): for annotation in self.__annotations: if(annotation["label"] == label): return True return False # .lstを生成して追加する def appendLst(self, lst, cls_list): index = len(lst.split('\n')) headerSize = 4 # hederSize,dataSize,imageWidth,imageHeight dataSize = 5 str = "{}\t{}\t{}\t{}\t{}".format(index, headerSize, dataSize, self.__img_width, self.__img_height) for annotation in self.__annotations: cls_id = cls_list.index(annotation["label"]) left = annotation["left"] right = left + annotation["width"] top = annotation["top"] bottom = top + annotation["height"] left = round(left / self.__img_width, 3) right = round(right / self.__img_width, 3) top = round(top / self.__img_height, 3) bottom = round(bottom / self.__img_height, 3) str += "\t{}\t{}\t{}\t{}\t{}".format(cls_id, left, top, right, bottom) fileName = self.__imgFileName str += "\t{}".format(fileName) lst += str + "\n" return lst # 全てのJSONデータを読み込む def getDataList(inputPath, manifest): dataList = [] with open("{}/{}".format(inputPath, manifest), 'r') as f: srcList = f.read().split('\n') for src in srcList: if(src != ''): json_src = json.loads(src) dataList.append(Data(json.loads(src))) return dataList def main(): # 出力先フォルダ生成 os.makedirs(outputPath, exist_ok=True) # 全てのJSONデータを読み込む dataList = getDataList(inputPath, manifest) total_len = len(dataList) train_len = int(total_len * ratio) print("全データ: {}件 train: {} validation: {}".format(total_len, train_len, total_len-train_len)) # .lst形式 train = '' validation = '' for i in range(train_len): data = dataList.pop() train = data.appendLst(train, CLASS_NAME) for data in dataList: validation = data.appendLst(validation, CLASS_NAME) # .lstファイルの生成 trainLst = "{}/train.lst".format(outputPath) validationLst = "{}/validation.lst".format(outputPath) with open(trainLst, mode='w') as f: f.write(train) with open(validationLst, mode='w') as f: f.write(validation) # im2rec.pyによるRecordIOファイル生成 # python im2rec.py --pack-label <your_lst_file_name> <your_image_folder> im2rec = "{}/im2rec.py".format(os.getcwd()) cmd = ["python3", im2rec, "--pack-label", "validation.lst", inputPath] result = subprocess.run(cmd, cwd=outputPath) print(result) cmd = ["python3", im2rec, "--pack-label", "train.lst", inputPath] result = subprocess.run(cmd, cwd=outputPath) print(result) main()
6 モデル
ml.p2.16xlargeで40分ぐらいかかってます。
参考のために、設定したパラメータです。
early_stoppingでepoch 38で止まりました。
epoch mAP smooth_l1 cross_entropy ----------------------------------------- 0 0.155 0.532 1.624 5 0.899 0.169 0.518 10 0.948 0.129 0.399 15 0.953 0.111 0.355 20 0.96 0.101 0.329 25 0.961 0.095 0.314 30 0.962 0.088 0.298 31 0.963 0.088 0.297 32 0.962 0.088 0.293 33 0.963 0.087 0.292 34 0.963 0.085 0.289 35 0.962 0.084 0.287 36 0.963 0.083 0.285 37 0.963 0.083 0.284 38 0.963 0.081 0.279
最後に、推論しているコードです。
from boto3.session import Session import json import cv2 profile = 'developer' endPoint = 'sampleEndPoint' categories = ["1en","5en","10en","50en","100en"] tanka = [1,5,10,50,100] deviceId = 1 # Webカメラ height = 600 width = 800 linewidth = 2 colors = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175)] class SageMaker(): def __init__(self, profile, endPoint): self.__endPoint = endPoint session = Session(profile_name = profile) self.__client = session.client('sagemaker-runtime') def invoke(self, image): data = self.__client.invoke_endpoint( EndpointName = self.__endPoint, Body = image, ContentType='image/jpeg' ) results = data['Body'].read() return json.loads(results) sageMake = SageMaker(profile, endPoint) cap = cv2.VideoCapture(deviceId) cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) fps = cap.get(cv2.CAP_PROP_FPS) width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) print("FPS:{} WIDTH:{} HEIGHT:{}".format(fps, width, height)) while True: # カメラ画像取得 ret, frame = cap.read() if(frame is None): continue _, jpg = cv2.imencode('.jpg', frame) detections = sageMake.invoke(jpg.tostring()) total = 0 for detection in detections["prediction"]: clsId = int(detection[0]) confidence = detection[1] x1 = int(detection[2] * width) y1 = int(detection[3] * height) x2 = int(detection[4] * width) y2 = int(detection[5] * height) label = "{} {:.2f}".format(categories[clsId], confidence) if(confidence > 0.6): frame = cv2.rectangle(frame,(x1, y1), (x2, y2), colors[clsId],linewidth) frame = cv2.rectangle(frame,(x1, y1), (x1 + 100,y1-20), colors[clsId], -1) cv2.putText(frame,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA) total += tanka[clsId] cv2.putText(frame,"TOTAL: {:,}.-".format(total),(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA) cv2.imshow('frame', frame) cv2.waitKey(1) cap.release() cv2.destroyAllWindows()
7 最後に
今回は、簡単に効果が確認できるように、表裏の2面しかない硬貨を対象に、クロマキー処理による背景透過を試してみました。
今後、もっと立体的な物体を回転台などに載せて撮影し、クロマキー処理で自動で背景透過出来ないかと考えています。
8 参考にさせて頂いたリンク
OpenCV – マスク画像を利用した画像処理について
RPi + Python + OpenCV その3「クロマキー」
色空間の変換